跳到主要内容

RocketMQ 学习01 配置环境

使用 Docker 配置环境

搭建 Rocket MQ 配置,这里使用 Docker

创建 broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 如果是本地程序调用云主机 mq,这个需要设置成 云主机 IP
# 如果 Docker 环境需要设置成宿主机IP
brokerIP1=172.26.128.1

创建一个 docker-compose.yml 文件

version: '3'
services:
namesrv:
image: apacherocketmq/rocketmq
container_name: namesrv
ports:
- 19876:9876
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
command: sh mqnamesrv
broker:
image: apacherocketmq/rocketmq
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./data/broker/logs:/home/rocketmq/logs
# - ./data/broker/store:/home/rocketmq/store
# - ./data/broker/broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c ./conf/broker.conf
environment:
JAVA_OPT_EXT: '-Xms256m -Xmx256m -Xmn128m'
depends_on:
- namesrv
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 38080:8080
environment:
JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
depends_on:
- namesrv

各角色详情

1、Producer:消息的发送者;举例:发信者

2、Consumer:消息接收者;举例:收信者

3、Broker:暂存和传输消息;举例:邮局

4、NameServer:管理 Broker;举例:各个邮局的管理机构

5、Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息

6、Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息

集群各角色特点

NameServer 和 Broker

NameServer 是一个几乎无状态节点(因为每个 NameServer 的信息都是一样的),可集群部署,节点之间无任何信息同步,因为注册 Broker 时,Broker 会主动上报自身情况

Broker 有 Master 和 Slave 的区分(看上图),Master 主要负责写操作,就是 Producer 发送消息过来,它保存下来,而 Slave 主要负责读操作,即当 Consumer 要取消息时,去查询是否有消息。

一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。

每个 Broker 与 NameServer 集群中所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。

集群内的 Master 节点之间不做数据交互

那为什么要将这么多 broker 组织成多主多从,而不是采用一主多从然后主节点 down 机后再选举啊? 因为。。。RocketMQ 中并没有 Master 选举功能,在 RocketMQ 集群中,1 台机器只能要么是 Master,要么是 Slave,这个在初始的机器配置里面,就定死了。不会像 kafka 那样存在 Master 动态选举的功能,所以通过配置多个 Master 节点来保证 RocketMQ 的高可用。

其中 Master 的 broker id = 0,Slave 的 broker id > 0。 有点类似于 MySQL 的主从概念,Master 挂了以后, Slave 仍然可以提供读服务,但是由于有多主的存在,当一个 Master 挂了以后,可以写到其他的 Master 上。

那 Master 节点之间不做数据交又怎么理解呢?

因为配置多个 Master 节点是为了负载均衡,消息本质都是需要被消费,所以这块数据放在这个 Master,那块数据放在另一个 Master 里面并没有影响。

Broker 队列分区(微观)

在 RocketMQ 中,是基于多个 Message Queue 来实现类似于 Kafka 的分区效果。如果一个 Topic 要发送和接收的数据量非常大, 需要能支持增加并行处理的机器来提高处理速度,这时候一个 Topic 可以根据需求设置一个或多个 Message Queue。

Topic 有了多个 Message Queue 后,消息可以并行地向各个 Message Queue 发送,消费者也可以并行地从多个 Message Queue 读取消息并消费。从而提高消费者并发处理的能力与消费者拉取消息时的负载均衡

注:RocketMQ 是通过多 Master 实现了对 Producer 发送消息的负载均衡,而不是 kafka 那样通过分区分片存储实现 Producer 发消息负载均衡

可以直接在控制台创建队列(分区):

图中那两个参数是什么意思呢?

  • writeQueueNums:写队列数,表示 producer 发送到的 MessageQueue 的队列个数
  • readQueueNums:读队列数,表示 Consumer 读取消息的 MessageQueue 队列个数

注:这两个值需要相等,在集群模式下如果不相等,writeQueueNums=6,readQueueNums=3, 那么每个 broker 上会有 3 个 queue 的消息是无法消费的。

上面创建一个 TestTopic 的主题,这个主题下有两个队列,即两个分区。

为了 Producer 发来的消息每个 Master 都能写,所以每个 Broker 上都要有 TestTopic 主题,而且每个 Broker 上的 TestTopic,都要有两个分区

Producer 和 Consumer

Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 发送心跳。Consumer 既可以订阅消息,也可以从 Slave 订阅信息,订阅规则由 Broker 配置决定。

集群模式的选择

因为谈论 NameServer 和 Consumer、Producer 集群没意义,所以一般说的集群都是指 Broker

单 Master 模式

风险较高,一旦 Broker 重启或宕机,整个服务不可用

多 Master 模式

一个集群无 Slave,全是 Master

优点:配置简单,单个 Master 宕机或者重启对应用无影响,即使宕机不可恢复,消息也不会丢失(异步刷盘丢失少量消息,同步刷盘不会丢失)

缺点:单台机器宕机期间,未被消费的消息在机器恢复之前不可订阅,消息实时性会收到影响。

多 Master 多 Slave 模式(异步)

每个 Master 配置一个 Slave,有多对 Master-Slave,采用 异步复制方式,主备有短暂消息延迟

优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样

缺点:Master 宕机,磁盘损坏情况下会丢失少量消息。

多 Master 多 Slave 模式(同步)

每个 Master 配置一个 Slave,有多对 Master-Slave,采用同步双写模式,即只有主备都写成功,才向应用返回成功,

优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性非常高。

缺点:性能比异步复制模式略低,发送单个消息的 RT 会略高,主节点宕机后,备机不能自动切换为主机

双主双从集群搭建

总体架构

消息高可用采用 2m-2s(同步双写)

集群工作流程

1、启动 NameServer,起来后监听端口,等待 Broker、Producer、Consumer 连上来,NameServer 相当于一个路由控制中心,它本身并不存储数据,只是起到一个路由作用,把具体的消息传递到 Broker 上来

2、Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP + 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

3、收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。

4、Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。

5、Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

配置服务器环境

首先配置虚拟机,模拟这个集群环境

序号Ip角色架构模式
1192.168.211.151NameServer  BrokerServerMaster1  Slave2
2192.168.211.152NameServer BrokerServerMaster2  Slave2

修改服务器 Host 信息

vim /etc/hosts

配置如下:

# nameserver
192.168.211.151 rocketmq-nameserver1
192.168.211.152 rocketmq-nameserver2
# broker
192.168.211.151 rocketmq-master1
192.168.211.151 rocketmq-slave1
192.168.211.152 rocketmq-master2
192.168.211.152 rocketmq-slave2

配置完成,重启网卡:

systemctl restart network
# Ubuntu 直接重启吧
sudo reboot

安装 RocketMQ

安装 JDK 以及配置 JAVA_HOME 参考:https://linuxhint.com/install_jdk_14_ubuntu/

sudo apt install openjdk-8-jdk
java -version
sudo update-alternatives --config java
# 查看输出来的地址
sudo vim /etc/environment

# 刷新
source /etc/environment

下面配置 RocketMQ

wget https://ftp.jaist.ac.jp/pub/apache/rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip
sudo unzip rocketmq-all-4.9.0-bin-release.zip

cd ./rocketmq-all-4.9.0-bin-release/bin

# ========== 启动 NameServer ==========
# 这里把日志输出到指定位置上(得先创建这个文件)
nohup sh mqnamesrv >> /root/log/nameserver.out 2>&1 &

# 或者
# 如果不想输出这个日志,可以直接
nohup sh mqnamesrv &
# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

# ========== 启动 Broker ==========
nohup sh mqbroker -n localhost:9876 &
# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log

如果因为内存不足启动失败,需要编辑如下两个文件

# 修改默认 JVM 大小
vi runbroker.sh
vi runserver.sh
# 两个文件的这里:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 改成:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -xx:MetaspaceSize=128m -xx:MaxMetaspaceSize=320m"

关闭 rocketmq

# 关闭 NameServer
sh mqshutdown namesrv
# 关闭 Broker
sh mqshutdown broker

配置环境变量

vim /etc/profile

在 profile 文件的末尾加上

ROCKETMQ_HOME=/root/rocketmq-all-4.9.0-bin-release/
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH

保存后

source /etc/profile

创建消息存储路径

新建相关目录,Broker 日志存储目录,由于主从在一个服务器搭建,所有要建立两个不一样的目录,否则从节点起不来

# 其实 mkdir 可以使用 -p 参数自动创建不存在的父目录
mkdir -p "/root/local/rocketmq/store-a"
mkdir -p "/root/local/rocketmq/store-b"

Broker 配置文件

其实 RocketMQ 提供了配置模板

因为是配置的双主双从模式,所以这里选择配置这两个文件

# broker 就是整个集群的名字,这里双主双从都是这个名字,所以不用变
brokerClusterName=DefaultCluster

#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a

#nameServer地址,分号分割(这里的 rocketmq-nameserver1 就是上面修改 host 文件的那个地址)
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 0 表示 Master,>0 表示 Slave
brokerId=0

# 删除一些不用的文件的时间点,这里是凌晨 4点
deleteWhen=04

# 文件保留时间,这里是 48 小时
fileReservedTime=48

#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER

#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false
# 发消息线程池数量
#sendMessageThreadPoolNums=128
# 拉消息线程池数量
#pullMessageThreadPoolNums=128

# Broker 对外服务的监听端口
listenPort=10911

# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true


#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000


#存储路径
storePathRootDir=/root/local/rocketmq/store-a/storePathRootDir
#commitLog 存储路径
storePathCommitLog=/root/local/rocketmq/store-a/storePathCommitLog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/local/rocketmq/store-a/storePathConsumeQueue
#消息索引存储路径
storePathIndex=/root/local/rocketmq/store-a/storePathIndex
#checkpoint 文件存储路径
storeCheckpoint=/root/local/rocketmq/store-a/storeCheckpoint
#abort 文件存储路径
abortFile=/root/local/rocketmq/store-a/abortFile

然后把这些粘贴到对于的配置文件中

先配置这个 broker-a.properties

再配置这个 broker-b-s.properties 其它的和上面没有区别,唯一值得注意的是 brokerRole 需要改成从节点,以及改下路径

# 别忘了改这个 brokerName
brokerName=broker-b
# 设置为从(>0)
brokerId=1
brokerRole=SLAVE

listenPort=11011

#存储路径(注意!!! 这个目录很关键,不能瞎写,不然会无法启动)
storePathRootDir=/root/local/rocketmq/store-b/storePathRootDir
#commitLog 存储路径
storePathCommitLog=/root/local/rocketmq/store-b/storePathCommitLog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/local/rocketmq/store-b/storePathConsumeQueue
#消息索引存储路径
storePathIndex=/root/local/rocketmq/store-b/storePathIndex
#checkpoint 文件存储路径
storeCheckpoint=/root/local/rocketmq/store-b/storeCheckpoint
#abort 文件存储路径
abortFile=/root/local/rocketmq/store-b/abortFile

同理配置 RocketMQ-02 主机,配置方式差不多,只不过是配置这两个文件:

注意,上面配置文件还需要改端口号(因为同个主机跑了两个 Broker 服务,所以需要让它们的端口不一样)

192.168.211.151
broker-b-s.properties => listenPort=11011
broker-a.properties => listenPort=10911

192.168.211.152
broker-a-s.properties => listenPort=11011
broker-b.properties => listenPort=10911

修改启动脚本文件

runbroker.sh 需要根据内存大小进行适当的对 JVM 参数进行调整

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

runserver.sh 同理

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

服务启动

启动 NameServe 集群

分别在192.168.211.151和192.168.211.152启动 NameServer

nohup sh mqnamesrv >> /root/local/rocketmq/nameserver.out 2>&1 &

检查日志文件:

如果报错:可能端口被占用了

java.net.BindException: Address already in use

nameserver 端口默认是 9876

# 检查端口
lsof -i :9876
kill PID

再次启动:

如果占用了端口,说明启动成功

启动 Broker

在192.168.211.151上启动master1和slave2的Broker

# master1:
nohup sh mqbroker -c /root/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-a.properties &
# slave2:
nohup sh mqbroker -c /root/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &

在192.168.211.152上启动master2和slave2的Broker

# master2:
nohup sh mqbroker -c /root/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-b.properties &
# slave1:
nohup sh mqbroker -c /root/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &

最后使用 jps(JDK 带的检查 java 进程的命令)命令检查服务是否全部启动

启动时的错误处理

如果出现(其实不管他,直接回车也可以):

nohup: ignoring input and appending output to 'nohup.out'

是因为使用 nohup 会产生日志文件,默认写入到 nohup.out

nohup sh mqbroker -c /root/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-a.properties >> /root/local/rocketmq/store-a/broker.out 2>&1 &
# 或者,将 nohup 的日志输出到 /dev/null,这个目录会让所有到它这的信息自动消失
# nohup sh mqbroker -c /root/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-b-s.properties </dev/null &>/dev/null &

sh mqbroker -c /root/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-a.properties
sh mqbroker -c /root/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-b-s.properties

如果报错:

java.lang.RuntimeException: Lock failed,MQ already started

启动的时候一定不要忘了 -c 才能指定配置文件启动,否则会造成两个broker冲突,起不来,因为 storePathRootDir 冲突了,会导致第二个起不来,启动日志报错

需要配置 storePathRootDir = /some/new/path/xxx 这个路径主从的不能一样

#存储路径(注意!!! 这个目录很关键,不能瞎写,不然会无法启动)
storePathRootDir=/root/local/rocketmq/store-b/storePathRootDir
#commitLog 存储路径
storePathCommitLog=/root/local/rocketmq/store-b/storePathCommitLog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/local/rocketmq/store-b/storePathConsumeQueue
#消息索引存储路径
storePathIndex=/root/local/rocketmq/store-b/storePathIndex
#checkpoint 文件存储路径
storeCheckpoint=/root/local/rocketmq/store-b/storeCheckpoint
#abort 文件存储路径
abortFile=/root/local/rocketmq/store-b/abortFile

最后检查是否启动成功

# 检查端口
lsof -i :10911
lsof -i :11011

如果错误没有抛出错误,它的日志在这里

集群监控平台的搭建

RocketMQ有一个对其扩展的开源项目 incubator-rocketmq-externals,这个项目中有一个子模块叫 rocketmq-console,这个便是管理控制台项目了

注意:这个控制台只需配置一个就行了

下载并编译打包(网络问题的话,可以手动下载)

git clone https://github.com/apache/rocketmq-externals
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true

这里同样因为网络问题,就使用 IDEA 自带的 Maven 工具打包了

注意:打包前在 rocketmq-console 中配置 namesrv 集群地址:

rocketmq.config.namesrvAddr=192.168.211.151:9876;192.168.211.152:9876

把打好的 jar 包丢 Linux 里面

启动 rocketmq-console:

nohup java -jar rocketmq-console-ng-2.0.0.jar >> ./rocketmqConsole.log 2>&1 &

访问控制台

http://192.168.211.151:8080/#/